3f2d5bea20ce647d6fff64a9b2d7a488b005f0c9,ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java,TestActionScheduler,testExclusiveRequests,#,2277
Before Change
Stage stageInProgress1 = spy(getStageWithSingleTask(
hostname1, "cluster1", Role.NAMENODE, RoleCommand.START,
Service.Type.HDFS, namenodeCmdTaskId, 1, (int) requestId1));
Stage stageInProgress2 = spy(getStageWithSingleTask(
hostname1, "cluster1", Role.DATANODE, RoleCommand.START,
Service.Type.HDFS, 2, 2, (int) requestId1));
Stage stageInProgress3 = spy(getStageWithSingleTask(
hostname2, "cluster1", Role.DATANODE, RoleCommand.STOP, //Exclusive
Service.Type.HDFS, 3, 3, (int) requestId2));
Stage stageInProgress4 = spy(getStageWithSingleTask(
hostname3, "cluster1", Role.DATANODE, RoleCommand.START,
Service.Type.HDFS, 4, 4, (int) requestId3));
stagesInProgress.add(stageInProgress1);
stagesInProgress.add(stageInProgress2);
stagesInProgress.add(stageInProgress3);
stagesInProgress.add(stageInProgress4);
Host host1 = mock(Host.class);
when(fsm.getHost(anyString())).thenReturn(host1);
when(host1.getState()).thenReturn(HostState.HEALTHY);
when(host1.getHostName()).thenReturn(hostname);
when(host1.getLastRegistrationTime()).thenReturn(HOST_LAST_REGISTRATION_TIME);
Host host2 = mock(Host.class);
when(fsm.getHost(anyString())).thenReturn(host2);
when(host2.getState()).thenReturn(HostState.HEALTHY);
when(host2.getHostName()).thenReturn(hostname);
when(host2.getLastRegistrationTime()).thenReturn(HOST_LAST_REGISTRATION_TIME);
Host host3 = mock(Host.class);
when(fsm.getHost(anyString())).thenReturn(host3);
when(host3.getState()).thenReturn(HostState.HEALTHY);
when(host3.getHostName()).thenReturn(hostname);
when(host3.getLastRegistrationTime()).thenReturn(HOST_LAST_REGISTRATION_TIME);
ActionDBAccessor db = mock(ActionDBAccessor.class);
when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
when(db.getStagesInProgress()).thenReturn(stagesInProgress);
List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
for (Stage stage : stagesInProgress) {
requestTasks.addAll(stage.getOrderedHostRoleCommands());
}
when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
when(db.getAllStages(anyLong())).thenReturn(stagesInProgress);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
for (CommandReport report : reports) {
String actionId = report.getActionId();
long[] requestStageIds = StageUtils.getRequestStage(actionId);
Long requestId = requestStageIds[0];
Long stageId = requestStageIds[1];
Long id = report.getTaskId();
for (Stage stage : stagesInProgress) {
if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
if (hostRoleCommand.getTaskId() == id) {
hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
}
}
}
}
}
return null;
}
}).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Long taskId = (Long) invocation.getArguments()[0];
for (Stage stage : stagesInProgress) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (taskId.equals(command.getTaskId())) {
return command;
}
}
}
return null;
}
});
final Map<Long, Boolean> startedRequests = new HashMap<Long, Boolean>();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
startedRequests.put((Long)invocation.getArguments()[0], true);
return null;
}
}).when(db).startRequest(anyLong());
RequestEntity request1 = mock(RequestEntity.class);
when(request1.isExclusive()).thenReturn(false);
RequestEntity request2 = mock(RequestEntity.class);
when(request2.isExclusive()).thenReturn(true);
RequestEntity request3 = mock(RequestEntity.class);
when(request3.isExclusive()).thenReturn(false);
when(db.getRequestEntity(requestId1)).thenReturn(request1);
when(db.getRequestEntity(requestId2)).thenReturn(request2);
when(db.getRequestEntity(requestId3)).thenReturn(request3);
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress1).getLastAttemptTime(anyString(), anyString());
doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress2).getLastAttemptTime(anyString(), anyString());
doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress3).getLastAttemptTime(anyString(), anyString());
doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress4).getLastAttemptTime(anyString(), anyString());
After Change
long requestId2 = 2;
long requestId3 = 3;
final List<Stage> stagesInProgress = new ArrayList<Stage>();
int namenodeCmdTaskId = 1;
stagesInProgress.add(
getStageWithSingleTask(
hostname1, "cluster1", Role.NAMENODE, RoleCommand.START,
Service.Type.HDFS, namenodeCmdTaskId, 1, (int) requestId1));
stagesInProgress.add(
getStageWithSingleTask(
hostname1, "cluster1", Role.DATANODE, RoleCommand.START,
Service.Type.HDFS, 2, 2, (int) requestId1));
stagesInProgress.add(
getStageWithSingleTask(
hostname2, "cluster1", Role.DATANODE, RoleCommand.STOP, //Exclusive